package com.atguigu.chapter11;

import com.atguigu.chapter05.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.time.Duration;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 * @date 2021/3/12 9:30
 */
public class Flink06_TableAPI_Connector_KafkaSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
//                .readTextFile("input/sensor.csv")
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        // 切分
                        String[] line = value.split(",");
                        return new WaterSensor(line[0], Long.parseLong(line[1]), Integer.parseInt(line[2]));

                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.getTs() * 1000L)
                );


        // TODO - Connector外部系统，读 Kafka
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useOldPlanner()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);

        Table inputTable = tableEnv.fromDataStream(sensorDS);

        tableEnv
                .connect(new Kafka()
                        .topic("flink0923")
                        .version("universal")
                        .property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092")
                )
//                .withFormat(new Csv())
                .withFormat(new Json())
                .withSchema(new Schema()
                                .field("id", DataTypes.STRING())
                                .field("ts", DataTypes.BIGINT())
                                .field("vc", DataTypes.INT())
                )
                .createTemporaryTable("kafkaSource");


        inputTable.executeInsert("kafkaSource");


        env.execute();
    }
}

/**
 * 版本兼容性问题
 *  1、如果kafka的版本比较老，0.11及之前，那么使用 OldPlanner
 *  2、如果kafka的版本比较新，1.0之后，使用 BlinkPlanner
 */
